{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Introduction to XGBoost Spark3.1 with GPU\n",
    "\n",
    "Taxi is an example of xgboost regressor. This notebook will show you how to load data, train the xgboost model and use this model to predict \"fare_amount\" of your taxi trip.\n",
    "\n",
    "A few libraries required for this notebook:\n",
    "  1. cudf-cu11\n",
    "  2. xgboost\n",
    "  3. scikit-learn\n",
    "  4. numpy\n",
    "\n",
    "This notebook also illustrates the ease of porting a sample CPU based Spark xgboost4j code into GPU. There is no change required for running Spark XGBoost on GPU because both CPU and GPU call the same API. For CPU run, we need to vectorize the trained dataset before fitting data to regressor."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Import Required Libraries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from xgboost.spark import SparkXGBRegressor, SparkXGBRegressorModel\n",
    "from pyspark.ml.evaluation import RegressionEvaluator\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql.types import FloatType, IntegerType, StructField, StructType\n",
    "from time import time\n",
    "from pyspark.conf import SparkConf\n",
    "import os\n",
    "# if you pass/unpack the archive file and enable the environment\n",
    "# os.environ['PYSPARK_PYTHON'] = \"./environment/bin/python\"\n",
    "# os.environ['PYSPARK_DRIVER_PYTHON'] = \"./environment/bin/python\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Besides CPU version requires two extra libraries.\n",
    "```Python\n",
    "from pyspark.ml.feature import VectorAssembler\n",
    "from pyspark.sql.functions import col\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Create Spark Session and Data Reader"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2022-11-30 07:51:19,104 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "2022-11-30 07:51:19,480 WARN resource.ResourceUtils: The configuration of cores (exec = 2 task = 1, runnable tasks = 2) will result in wasted resources due to resource gpu limiting the number of runnable tasks per executor to: 1. Please adjust your configuration.\n",
      "2022-11-30 07:51:33,277 WARN rapids.RapidsPluginUtils: RAPIDS Accelerator 25.02.1 using cudf 25.02.1.\n",
      "2022-11-30 07:51:33,292 WARN rapids.RapidsPluginUtils: spark.rapids.sql.multiThreadedRead.numThreads is set to 20.\n",
      "2022-11-30 07:51:33,295 WARN rapids.RapidsPluginUtils: RAPIDS Accelerator is enabled, to disable GPU support set `spark.rapids.sql.enabled` to false.\n",
      "2022-11-30 07:51:33,295 WARN rapids.RapidsPluginUtils: spark.rapids.sql.explain is set to `NOT_ON_GPU`. Set it to 'NONE' to suppress the diagnostics logging about the query placement on the GPU.\n",
      "2022-11-30 07:51:33,798 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.\n"
     ]
    }
   ],
   "source": [
    "SPARK_MASTER_URL = os.getenv(\"SPARK_MASTER_URL\", \"/your-url\")\n",
    "\n",
    "RAPIDS_JAR = os.getenv(\"RAPIDS_JAR\", \"/your-jar-path\")\n",
    "\n",
    "# You need to update with your real hardware resource \n",
    "driverMem = os.getenv(\"DRIVER_MEM\", \"2g\")\n",
    "executorMem = os.getenv(\"EXECUTOR_MEM\", \"2g\")\n",
    "pinnedPoolSize = os.getenv(\"PINNED_POOL_SIZE\", \"2g\")\n",
    "concurrentGpuTasks = os.getenv(\"CONCURRENT_GPU_TASKS\", \"2\")\n",
    "executorCores = int(os.getenv(\"EXECUTOR_CORES\", \"2\"))\n",
    "# Common spark settings\n",
    "conf = SparkConf()\n",
    "conf.setMaster(SPARK_MASTER_URL)\n",
    "conf.setAppName(\"Microbenchmark on GPU\")\n",
    "conf.set(\"spark.executor.instances\",\"1\")\n",
    "conf.set(\"spark.driver.memory\", driverMem)\n",
    "## The tasks will run on GPU memory, so there is no need to set a high host memory\n",
    "conf.set(\"spark.executor.memory\", executorMem)\n",
    "## The tasks will run on GPU cores, so there is no need to use many cpu cores\n",
    "conf.set(\"spark.executor.cores\", executorCores)\n",
    "\n",
    "# Plugin settings\n",
    "conf.set(\"spark.executor.resource.gpu.amount\", \"1\")\n",
    "conf.set(\"spark.rapids.sql.concurrentGpuTasks\", concurrentGpuTasks)\n",
    "conf.set(\"spark.rapids.memory.pinnedPool.size\", pinnedPoolSize)\n",
    "# since pyspark and xgboost share the same GPU, we need to allocate some memory to xgboost to avoid GPU OOM while training \n",
    "conf.set(\"spark.rapids.memory.gpu.allocFraction\",\"0.7\")\n",
    "conf.set(\"spark.locality.wait\",\"0\")\n",
    "##############note: only support value=1 https://github.com/dmlc/xgboost/blame/master/python-package/xgboost/spark/core.py#L370-L374\n",
    "conf.set(\"spark.task.resource.gpu.amount\", 1) \n",
    "conf.set(\"spark.rapids.sql.enabled\", \"true\") \n",
    "conf.set(\"spark.plugins\", \"com.nvidia.spark.SQLPlugin\")\n",
    "conf.set(\"spark.sql.cache.serializer\",\"com.nvidia.spark.ParquetCachedBatchSerializer\")\n",
    "conf.set(\"spark.sql.execution.arrow.maxRecordsPerBatch\", 200000) \n",
    "conf.set(\"spark.driver.extraClassPath\", RAPIDS_JAR)\n",
    "conf.set(\"spark.executor.extraClassPath\", RAPIDS_JAR)\n",
    "\n",
    "# if you pass/unpack the archive file and enable the environment\n",
    "# conf.set(\"spark.yarn.dist.archives\", \"your_pyspark_venv.tar.gz#environment\")\n",
    "# Create spark session\n",
    "spark = SparkSession.builder.config(conf=conf).getOrCreate()\n",
    "\n",
    "reader = spark.read"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Specify the Data Schema and Load the Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "label = 'fare_amount'\n",
    "schema = StructType([\n",
    "    StructField('vendor_id', FloatType()),\n",
    "    StructField('passenger_count', FloatType()),\n",
    "    StructField('trip_distance', FloatType()),\n",
    "    StructField('pickup_longitude', FloatType()),\n",
    "    StructField('pickup_latitude', FloatType()),\n",
    "    StructField('rate_code', FloatType()),\n",
    "    StructField('store_and_fwd', FloatType()),\n",
    "    StructField('dropoff_longitude', FloatType()),\n",
    "    StructField('dropoff_latitude', FloatType()),\n",
    "    StructField(label, FloatType()),\n",
    "    StructField('hour', FloatType()),\n",
    "    StructField('year', IntegerType()),\n",
    "    StructField('month', IntegerType()),\n",
    "    StructField('day', FloatType()),\n",
    "    StructField('day_of_week', FloatType()),\n",
    "    StructField('is_weekend', FloatType()),\n",
    "])\n",
    "features = [ x.name for x in schema if x.name != label ]\n",
    "\n",
    "# You need to update them to your real paths!\n",
    "dataRoot = os.getenv(\"DATA_ROOT\", \"/data\")\n",
    "train_path = dataRoot + \"/taxi/csv/train\"\n",
    "eval_path = dataRoot + \"/taxi/csv/test\"\n",
    "\n",
    "data_format = 'csv'\n",
    "has_header = 'true'\n",
    "if data_format == 'csv':\n",
    "    train_data = reader.schema(schema).option('header',has_header).csv(train_path)\n",
    "    trans_data = reader.schema(schema).option('header',has_header).csv(eval_path)\n",
    "else :\n",
    "    train_data = reader.load(train_path)\n",
    "    trans_data = reader.load(eval_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note on CPU version, vectorization is required before fitting data to regressor, which means you need to assemble all feature columns into one column.\n",
    "\n",
    "```Python\n",
    "def vectorize(data_frame):\n",
    "    to_floats = [ col(x.name).cast(FloatType()) for x in data_frame.schema ]\n",
    "    return (VectorAssembler()\n",
    "        .setInputCols(features)\n",
    "        .setOutputCol('features')\n",
    "        .transform(data_frame.select(to_floats))\n",
    "        .select(col('features'), col(label)))\n",
    "\n",
    "train_data = vectorize(train_data)\n",
    "trans_data = vectorize(trans_data)\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Create a XGBoostRegressor"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "params = { \n",
    "    \"tree_method\": \"gpu_hist\",\n",
    "    \"grow_policy\": \"depthwise\",\n",
    "    \"num_workers\": 1,\n",
    "    \"device\": \"cuda\",\n",
    "}\n",
    "params['features_col'] = features\n",
    "params['label_col'] = label\n",
    "    \n",
    "regressor = SparkXGBRegressor(**params)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The parameter `num_workers` should be set to the number of GPUs in Spark cluster for GPU version, while for CPU version it is usually equal to the number of the CPU cores.\n",
    "\n",
    "Concerning the tree method, GPU version only supports `gpu_hist` currently, while `hist` is designed and used here for CPU training.\n",
    "\n",
    "An example of CPU classifier:\n",
    "```\n",
    "classifier = SparkXGBClassifier(\n",
    "  feature_col=features,\n",
    "  label_col=label,  \n",
    "  num_workers=1024,\n",
    "  use_gpu=False,\n",
    ")\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Train the Data with Benchmark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "If features_cols param set, then features_col param is ignored.\n",
      "[Stage 2:>                                                          (0 + 1) / 1]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Training takes 24.12 seconds\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "                                                                                \r",
      "/data/home/yuanli/work/reviews/pr252/pyspark_venv_20221125/lib/python3.8/site-packages/xgboost/sklearn.py:808: UserWarning: Loading a native XGBoost model with Scikit-Learn interface.\n",
      "  warnings.warn(\"Loading a native XGBoost model with Scikit-Learn interface.\")\n"
     ]
    }
   ],
   "source": [
    "def with_benchmark(phrase, action):\n",
    "    start = time()\n",
    "    result = action()\n",
    "    end = time()\n",
    "    print('{} takes {} seconds'.format(phrase, round(end - start, 2)))\n",
    "    return result\n",
    "model = with_benchmark('Training', lambda: regressor.fit(train_data))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Save and Reload the Model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "If features_cols param set, then features_col param is ignored.\n"
     ]
    }
   ],
   "source": [
    "model.write().overwrite().save(dataRoot + '/model/taxi')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "loaded_model = SparkXGBRegressorModel().load(dataRoot + '/model/taxi')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Transformation and Show Result Sample"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "scrolled": false
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2022-11-30 07:52:27,357 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Transformation takes 0.93 seconds\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2022-11-30 07:52:28,189 WARN rapids.GpuOverrides: \n",
      "!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n",
      "  @Partitioning <SinglePartition$> could run on GPU\n",
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------+---------------+-------------+-----------+-----------+\n",
      "|     vendor_id|passenger_count|trip_distance|fare_amount| prediction|\n",
      "+--------------+---------------+-------------+-----------+-----------+\n",
      "|1.559730432E09|            2.0|  0.699999988|        5.0|5.046935558|\n",
      "|1.559730432E09|            3.0|  10.69999981|       34.0|31.72706413|\n",
      "|1.559730432E09|            1.0|  2.299999952|       10.0|9.294451714|\n",
      "|1.559730432E09|            1.0|  4.400000095|       16.5|15.05233097|\n",
      "|1.559730432E09|            1.0|          1.5|        7.0|8.995832443|\n",
      "+--------------+---------------+-------------+-----------+-----------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "def transform():\n",
    "    result = loaded_model.transform(trans_data).cache()\n",
    "    result.foreachPartition(lambda _: None)\n",
    "    return result\n",
    "result = with_benchmark('Transformation', transform)\n",
    "result.select('vendor_id', 'passenger_count', 'trip_distance', label, 'prediction').show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note on CPU version: You cannot `select` the feature columns after vectorization. So please use `result.show(5)` instead."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Evaluation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Evaluation takes 0.22 seconds\n",
      "RMSE is 1.9141528471228921\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2022-11-30 07:52:28,580 WARN rapids.GpuOverrides: \n",
      "! <DeserializeToObjectExec> cannot run on GPU because not all expressions can be replaced; GPU does not currently support the operator class org.apache.spark.sql.execution.DeserializeToObjectExec\n",
      "  ! <CreateExternalRow> createexternalrow(prediction#87, fare_amount#728, 1.0#729, StructField(prediction,DoubleType,true), StructField(fare_amount,DoubleType,true), StructField(1.0,DoubleType,false)) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow\n",
      "    @Expression <AttributeReference> prediction#87 could run on GPU\n",
      "    @Expression <AttributeReference> fare_amount#728 could run on GPU\n",
      "    @Expression <AttributeReference> 1.0#729 could run on GPU\n",
      "  !Expression <AttributeReference> obj#733 cannot run on GPU because expression AttributeReference obj#733 produces an unsupported type ObjectType(interface org.apache.spark.sql.Row)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "accuracy = with_benchmark(\n",
    "    'Evaluation',\n",
    "    lambda: RegressionEvaluator().setLabelCol(label).evaluate(result))\n",
    "print('RMSE is ' + str(accuracy))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Stop"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
